New York taxis trips

This homework is about New York taxi trips. Here is something from Todd Schneider:

The New York City Taxi & Limousine Commission has released a detailed historical dataset covering over 1 billion individual taxi trips in the city from January 2009 through December 2019. Taken as a whole, the detailed trip-level data is more than just a vast list of taxi pickup and drop off coordinates: it's a story of a City. How bad is the rush hour traffic from Midtown to JFK? Where does the Bridge and Tunnel crowd hang out on Saturday nights? What time do investment bankers get to work? How has Uber changed the landscape for taxis? The dataset addresses all of these questions and many more.

The NY taxi trips dataset has been plowed by series of distinguished data scientists. The dataset is available from on Amazon S3 (Amazon's cloud storage service). The link for each file has the following form:

https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_{year}-{month}.csv

There is one CSV file for each NY taxi service (yellow, green, fhv) and each calendar month (replacing {year} and {month} by the desired ones). Each file is moderately large, a few gigabytes. The full dataset is relatively large if it has to be handled on a laptop (several hundred gigabytes).

You will focus on the yellow taxi service and a pair of months, from year 2015 and from year 2018. Between those two years, for hire vehicles services have taken off and carved a huge marketshare.

Whatever the framework you use, CSV files prove hard to handle. After downloading the appropriate files (this takes time, but this is routine), a first step will consist in converting the csv files into a more Spark friendly format such as parquet.

Saving into one of those formats require decisions about bucketing, partitioning and so on. Such decisions influence performance. It is your call. Many people have been working on this dataset, to cite but a few:

Depending on your internet connection, download the files corresponding to "yellow" taxis for the years 2015 and 2018. Download at least one month (the same) for 2015 and 2018, if you can download all of them.

Hint. The 12 csv for 2015 are about 23GB in total, but the corresponding parquet file, if you can create it for all 12 months, is only about 3GB.

You might need the following stuff in order to work with GPS coordinates and to plot things easily.

In [3]:
#!pip install geojson geopandas plotly geopy
In [6]:
#!pip install descartes contextily ipyleaflet
In [1]:
#This is only to output every operation line in each block
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "last"

# import the usual suspects
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import json
import requests
from pathlib import Path
import sys
import timeit
import calendar

#%matplotlib inline
import seaborn as sns
sns.set_context("notebook", font_scale=1.2)

# Plotly
import plotly
import plotly.express as px
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)
import plotly.figure_factory as ff
from plotly.subplots import make_subplots

# Ipyleaflet
import ipyleaflet
from ipyleaflet import Map, basemaps, Heatmap, linear
from random import uniform

#Geopandas
import geopandas
import contextily as ctx

# Spark
import pyspark.sql.functions as fn
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col
from pyspark.sql.catalog import Catalog
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
In [2]:
conf = SparkConf().setAppName("NYC Taxis")
sc = SparkContext(conf=conf)
In [5]:
# Setting the amount of RAM memory for every executor in the Spark session to 12gb
# Normally we dont want to use all available memory in our computer but at least 1gb less than the max we have
# In my case I have 15,5 gb of RAM so using 12gb allows for 3.5gb for the rest of the processes being run
spark = (SparkSession
     .builder
     .config("spark.executor.memory", "4gb")
     .config("spark.driver.memory", "4gb")   
     .appName("NYC_Analysis")
     .getOrCreate())
In [4]:
# We put this, because the first time we create a session the memory parameters are not configured
# So we need to stop and re run the session again so that the values for executor.memory and driver.memory
# appear in the session
spark.stop()

For this homework we will let you decide on the tools to use (expected for Spark) and to find out information all by yourself (but don't hesitate to ask questions on the slack channel).

Loading data as parquet files

We want to organize the data on a per year and per service basis. We want to end up with one parquet file for each year and each taxi service, since parquet is much better than CSV files.

Hint. Depending on your internet connection and your laptop, you can use only the "yellow" service and use one month of 2015 and 2018

CSV files can contain corrupted lines. You may have to work in order to perform ETL (Extract-Transform-Load) in order obtain a properly typed data frame.

You are invited to proceed as follows:

  1. Try to read the CSV file without imposing a schema.
  2. Inspect the inferred schema. Do you agree with Spark's typing decision?
  3. Eventually correct the schema and read again the data
  4. Save the data into parquet files
  5. In the rest of your work, you will only use the parquet files you created, not the csv files (don't forget to choose a partitioning column and a number of partitions when creating the parquet files).

Hint. Don't forget to ask Spark to use all the memory and ressources from your computer.

Hint. Don't foreget that you should specify a partitioning column and a number of partitions when creating the parquet files.

Hint. Note that the schemas of the 2015 and 2018 data are different...

Hint. When working on this, ask you and answer to the following questions:

  1. What is the StorageLevel of the dataframe after reading the csv files?
  2. What is the number of partitions of the dataframe?

  3. Is it possible to tune this number at loading time?

  4. Why would we want to modify the number of partitions when creating the parquet files?
In [6]:
# Loading months of June and July for 2015
df_15_06 = spark.read\
             .format('csv')\
             .option("header", "true")\
             .option("mode", "FAILFAST")\
             .option("inferSchema", "true")\
             .option("sep", ",")\
             .load("yellow_tripdata_2015-06.csv")

df_15_07 = spark.read\
             .format('csv')\
             .option("header", "true")\
             .option("mode", "FAILFAST")\
             .option("inferSchema", "true")\
             .option("sep", ",")\
             .load("yellow_tripdata_2015-07.csv")

df_15_06.printSchema()
df_15_07.printSchema()
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

In [7]:
# Dataframe union for year 2015 months June and July
df_15_csv = df_15_06.union(df_15_07)
In [8]:
# Rounding the columns for coordinates to be able to make more efficient groupings and filtering of zones
# Adding columns Day Month and Year for partition of parquet files
df_15_csv = df_15_csv\
                    .withColumn("pickup_longitude", fn.round("pickup_longitude", 3))\
                    .withColumn("pickup_latitude", fn.round("pickup_latitude",3))\
                    .withColumn("dropoff_longitude", fn.round("dropoff_longitude",3))\
                    .withColumn("dropoff_latitude", fn.round("dropoff_latitude",3))\
                    .withColumn("Day", fn.dayofmonth("tpep_pickup_datetime"))\
                    .withColumn("Month", fn.month("tpep_pickup_datetime"))\
                    .withColumn("Year", fn.year("tpep_pickup_datetime"))
In [17]:
df_15_csv.printSchema()
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)

In [11]:
# Loading months of June and July for 2018
df_18_06 = spark.read\
             .format('csv')\
             .option("header", "true")\
             .option("mode", "FAILFAST")\
             .option("inferSchema", "true")\
             .option("sep", ",")\
             .load("yellow_tripdata_2018-06.csv")

df_18_07 = spark.read\
             .format('csv')\
             .option("header", "true")\
             .option("mode", "FAILFAST")\
             .option("inferSchema", "true")\
             .option("sep", ",")\
             .load("yellow_tripdata_2018-07.csv")

df_18_06.printSchema()
df_18_07.printSchema()
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

In [12]:
# Dataframe union for year 2018 months June and July
df_18_csv = df_18_06.union(df_18_07)
In [13]:
# Adding columns Day Month and Year for partition of parquet files
df_18_csv = df_18_csv.withColumn("Day", fn.dayofmonth("tpep_pickup_datetime"))\
                    .withColumn("Month", fn.month("tpep_pickup_datetime"))\
                    .withColumn("Year", fn.year("tpep_pickup_datetime"))
In [14]:
# Filtering of columns due to corrupt data in year 2018 where there are more months than expected
df_18_csv = df_18_csv.where("(Month = 6 OR Month = 7) AND Year = 2018")
In [10]:
# Creation of parquet file for 2015 yellow taxi data, we partition by month and day
df_15_csv.repartition("Month", "Day").write.partitionBy("Month", "Day").parquet("parquets/yellow_tripdata_2015.parquet")
In [15]:
# Creation of parquet file for 2018 yellow taxi data, we partition by month and day
df_18_csv.repartition("Month", "Day").write.partitionBy("Month", "Day").parquet("parquets/yellow_tripdata_2018.parquet")

Investigate (at least) one month of data in 2015

From now on, you will be using the parquet files you created for 2015.

We shall visualize several features of taxi traffic during one calendar month in 2015 and the same calendar month in 2018.

Hint. In order to build appealing graphics, you may stick to matplotlib + seaborn, you can use also plotly, which is used a lot to build interactive graphics, but you can use whatever you want.

In [6]:
df_15_06_pds = pd.read_parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
In [7]:
df_15_06_spark = spark.read.parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
In [84]:
#df_2015_June.select("VendorID", "passenger_count", "tpep_pickup_datetime").show()

The following longitudes and lattitudes encompass Newark and JFK airports, Northern Manhattan and Verazzano bridge.

In [8]:
long_min = -74.10
long_max = -73.70
lat_min = 40.58
lat_max = 40.90
  1. Using these boundaries, filter the 2015 data (using pickup and dropoff longitude and latitude) and count the number of trips for each value of passenger_count and make a plot of that.
In [9]:
query1 = f"""
(pickup_longitude BETWEEN {long_min} AND {long_max} AND pickup_latitude BETWEEN {lat_min} AND {lat_max}) 
AND (dropoff_longitude BETWEEN {long_min} AND {long_max} AND dropoff_latitude BETWEEN {lat_min} AND {lat_max})"""
df_coordinates = df_15_06_spark.where(query1)
In [10]:
filtered_count = df_coordinates.groupBy("passenger_count").count().toPandas()
In [11]:
fig = px.bar(filtered_count, x='passenger_count', y='count', hover_data=['count', 'passenger_count'], 
             color='count',
             labels={'passenger_count':'# of passengers', 'count':'# of trips'}, height=600,
             color_continuous_scale=px.colors.sequential.Viridis
            )

fig.update_layout(
    title="Number of trips for different passenger occupancies",
    xaxis_title="Number of passengers",
    yaxis_title="Number of trips")

fig.show()

Trips with $0$ or larger than $7$ passengers are pretty rare. We suspect these to be outliers. We need to explore these trips further in order order to understand what might be wrong with them

In [26]:
zero_passenger_df = df_2015_June.where("passenger_count = 0").toPandas()
plus6_passenger_df = df_2015_June.where("passenger_count > 6").toPandas()
  1. What's special with trips with zero passengers?

They should not exist since a taxi with no passengers is just a car.

In [15]:
zero_passenger_df
Out[15]:
VendorID tpep_pickup_datetime tpep_dropoff_datetime trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount Day passenger_count
0 1 2015-06-09 00:17:15 2015-06-09 00:17:18 0.00 -73.987358 40.731770 5 N -73.987350 40.731777 1 20.80 0.0 0.0 1.50 0.0 0.3 22.60 9 0
1 1 2015-06-09 00:31:34 2015-06-09 00:35:37 0.70 0.000000 0.000000 1 N 0.000000 0.000000 1 5.00 0.5 0.5 1.50 0.0 0.3 7.80 9 0
2 1 2015-06-09 01:37:51 2015-06-09 01:49:06 2.70 0.000000 0.000000 1 Y 0.000000 0.000000 1 11.00 0.5 0.5 2.45 0.0 0.3 14.75 9 0
3 2 2015-06-09 01:28:03 2015-06-09 01:42:52 5.42 -73.991493 40.750088 5 N -74.043991 40.786510 1 40.00 0.0 0.0 4.00 0.0 0.3 44.30 9 0
4 1 2015-06-09 02:02:50 2015-06-09 02:54:45 17.60 0.000000 0.000000 1 Y 0.000000 0.000000 1 54.00 0.5 0.5 5.00 0.0 0.3 60.30 9 0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
1549 1 2015-06-24 22:33:24 2015-06-24 22:47:58 4.50 -73.988609 40.751278 5 N -74.044800 40.743198 1 0.01 0.0 0.0 0.00 0.0 0.3 0.31 24 0
1550 1 2015-06-24 23:05:43 2015-06-24 23:11:39 1.10 -73.985756 40.756668 1 N -73.987610 40.744144 2 6.00 0.5 0.5 0.00 0.0 0.3 7.30 24 0
1551 1 2015-06-24 22:45:52 2015-06-24 23:00:48 0.60 -73.988426 40.756374 1 N -73.983902 40.759686 2 10.00 0.5 0.5 0.00 0.0 0.3 11.30 24 0
1552 1 2015-06-24 23:12:40 2015-06-24 23:27:09 3.10 0.000000 0.000000 1 N -74.007591 40.705975 1 12.50 0.5 0.5 2.75 0.0 0.3 16.55 24 0
1553 1 2015-06-24 23:20:01 2015-06-24 23:23:55 0.50 -73.994759 40.724960 1 N -74.002205 40.729496 1 4.50 0.5 0.5 1.00 0.0 0.3 6.80 24 0

1554 rows × 20 columns

  1. What's special with trips with more than $6$ passengers?

They should also not exist since a New York taxi can carry at most 5-6 people.

In [11]:
plus6_passenger_df.head(10)
Out[11]:
VendorID tpep_pickup_datetime tpep_dropoff_datetime trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount Day passenger_count
0 2 2015-06-07 10:57:37 2015-06-07 10:57:42 0.00 -73.965828 40.795212 5 N -73.965828 40.795212 2 7.20 0.0 0.5 0.00 0.00 0.3 8.00 7 7
1 2 2015-06-07 10:57:37 2015-06-07 10:57:42 0.00 -73.965828 40.795212 5 N -73.965828 40.795212 3 -7.20 0.0 -0.5 0.00 0.00 -0.3 -8.00 7 7
2 2 2015-06-07 13:49:07 2015-06-07 15:08:06 39.85 -73.871040 40.773750 5 N -74.047852 40.781754 1 70.00 0.0 0.5 19.97 29.04 0.3 119.81 7 7
3 2 2015-06-07 23:16:57 2015-06-07 23:16:59 0.00 0.000000 0.000000 5 N -73.974197 40.778740 1 70.00 0.0 0.5 0.00 0.00 0.3 70.80 7 7
4 2 2015-06-07 03:27:48 2015-06-07 03:32:31 0.00 -73.996292 40.753616 5 N -73.996346 40.753643 2 90.00 0.0 0.0 0.00 0.00 0.3 90.30 7 9
5 2 2015-06-07 03:27:48 2015-06-07 03:32:31 0.00 -73.996292 40.753616 5 N -73.996346 40.753643 3 -90.00 0.0 0.0 0.00 0.00 -0.3 -90.30 7 9
6 2 2015-06-03 17:33:24 2015-06-03 17:33:29 0.00 -74.177727 40.690384 5 N -74.177750 40.690403 1 70.00 0.0 0.0 0.00 0.00 0.3 70.30 3 7
7 2 2015-06-03 19:53:11 2015-06-03 19:53:15 0.03 0.000000 0.000000 5 N 0.000000 0.000000 2 70.00 0.0 0.0 0.00 0.00 0.3 70.30 3 7
8 1 2015-06-03 20:49:12 2015-06-03 20:56:50 0.80 -73.977844 40.758900 1 N -73.988213 40.756008 2 6.00 0.5 0.5 0.00 0.00 0.3 7.30 3 7
9 2 2015-06-21 04:57:33 2015-06-21 04:57:37 0.00 -73.994164 40.732151 5 N -73.994164 40.732151 1 7.77 0.0 0.5 0.05 0.00 0.0 8.32 21 8
  1. What is the largest distance travelled during this month? Is it the first taxi on the moon?

The largest distance travelled this month is 10,083,318 miles whilst the distance from Earth to the Moon is 238,900 miles. This means that someone travelled more than 21 times there and back to the moon in one trip!

In [27]:
max_distance = df_coordinates.agg({"trip_distance": "max"}).collect()[0][0]
print("Largest distance travelled this month: {:,} miles".format(max_distance))
print("Distance from Earth to the Moon: 238,900 miles")
Largest distance travelled this month: 10,083,318.0 miles
Distance from Earth to the Moon: 238,900 miles
  1. Plot the distribution of the trip_distance (using an histogram for instance) during year 2015. Focus on trips with non-zero trip distance and trip distance less than 30 miles.
In [317]:
distance_travelling = df_15_06_pds[(df_15_06_pds['trip_distance'] > 0) & (df_15_06_pds['trip_distance'] < 30)]
In [95]:
#distance_travelling
In [364]:
fig = plt.figure(figsize=(10,5))
sns.set()
g = sns.distplot(distance_travelling["trip_distance"], color="purple")

g.set_title("Trip distance distribution for distances between 0 and 30 miles")
plt.xlabel("Distance in miles")
plt.ylabel("Proportion")
plt.show(g)

Let's look at what Spark does for these computations

  1. Use the explain method or have a look at the Spark UI to analyze the job. You should be able to assess
    • Parsed Logical Plan
    • Analyzed Logical Plan
    • Optimized Logical Plan
    • Physical Plan
  2. Do the Analyzed Logical Plan and Optimized Logical Plan differ? Spot the differences if any. How would a RDBMS proceed with such a query?
  3. How does the physical plan differ from the Optimized Logical Plan? What are the keywords you would not expects in a RDBMS? What is their meaning?
  4. Inspect the stages on Spark UI. How many stages are necessary to complete the Spark job? What are the roles of HashAggregate and Exchange hashpartitioning?
  5. Does the physical plan perform shuffle operations? If yes how many?
  6. What are tasks with respect to stages (in Spark language)? How many tasks are your stages made of?

Now, compute the following and produce relevant plots:

  1. Break down the trip distance distribution for each day of week
In [529]:
distance_travelling["Week_day"] = distance_travelling['tpep_pickup_datetime'].dt.day_name()#
In [333]:
fig, axes = plt.subplots(nrows=7 , ncols=1, figsize=(15, 40))
for i, day in enumerate(calendar.day_name[0:]):
    sns.distplot(distance_travelling[distance_travelling["Week_day"] == day]["trip_distance"], ax=axes[i], color="purple")\
    .set(xlabel='Trip Distance', ylabel='Proportion', title=f"{day} trip distribution")
    
fig.subplots_adjust(top=0.9, hspace = .6)
plt.show()
  1. Count the number of distinct pickup location
In [12]:
df_coordinates.select("pickup_longitude", "pickup_latitude").distinct().count()
Out[12]:
25526
  1. Compute and display tips and profits as a function of the pickup location
In [222]:
ny_bor = geopandas.read_file(geopandas.datasets.get_path('nybb'))
ny = geopandas.read_file("BoroughBoundaries.geojson")
In [168]:
bronx = ny["geometry"].iloc[0].bounds
staten = ny["geometry"].iloc[1].bounds
brooklyn = ny["geometry"].iloc[2].bounds
queens = ny["geometry"].iloc[3].bounds
manhattan = ny["geometry"].iloc[4].bounds
In [175]:
#Bronx
bronx_long_min = bronx[0]
bronx_long_max = bronx[2]
bronx_lat_min = bronx[1]
bronx_lat_max = bronx[3]
#Staten
staten_long_min = staten[0]
staten_long_max = staten[2]
staten_lat_min = staten[1]
staten_lat_max = staten[3]
#Brooklyn
brooklyn_long_min = brooklyn[0]
brooklyn_long_max = brooklyn[2]
brooklyn_lat_min = brooklyn[1]
brooklyn_lat_max = brooklyn[3]
#Queens
queens_long_min = queens[0]
queens_long_max = queens[2]
queens_lat_min = queens[1]
queens_lat_max = queens[3]
#Manhattan
manhattan_long_min = manhattan[0]
manhattan_long_max = manhattan[2]
manhattan_lat_min = manhattan[1]
manhattan_lat_max = manhattan[3]
In [179]:
querybronx = f"(pickup_longitude BETWEEN {bronx_long_min} AND {bronx_long_max} AND pickup_latitude BETWEEN {bronx_lat_min} AND {bronx_lat_max})"

queryqueens = f"(pickup_longitude BETWEEN {queens_long_min} AND {queens_long_max} AND pickup_latitude BETWEEN {queens_lat_min} AND {queens_lat_max})"

querymanhattan = f"(pickup_longitude BETWEEN {manhattan_long_min} AND {manhattan_long_max} AND pickup_latitude BETWEEN {manhattan_lat_min} AND {manhattan_lat_max})"

querystaten = f"(pickup_longitude BETWEEN {staten_long_min} AND {staten_long_max} AND pickup_latitude BETWEEN {staten_lat_min} AND {staten_lat_max})"

querybrooklyn = f"(pickup_longitude BETWEEN {brooklyn_long_min} AND {brooklyn_long_max} AND pickup_latitude BETWEEN {brooklyn_lat_min} AND {brooklyn_lat_max})"
In [212]:
distinct_profits = df_coordinates.groupBy("pickup_longitude", "pickup_latitude").agg({"total_amount":"sum", "tip_amount":"sum"})
In [213]:
df_bronx = distinct_profits.where(querybronx).withColumn("zone", fn.lit("Bronx"))
df_manhattan = distinct_profits.where(querymanhattan).withColumn("zone", fn.lit("Manhattan"))
df_queens = distinct_profits.where(queryqueens).withColumn("zone", fn.lit("Queens"))
df_brooklyn = distinct_profits.where(querybrooklyn).withColumn("zone", fn.lit("Brooklyn"))
df_staten = distinct_profits.where(querystaten).withColumn("zone", fn.lit("Staten Island"))
In [214]:
boroughs = df_bronx.union(df_manhattan)
boroughs = boroughs.union(df_queens)
boroughs = boroughs.union(df_brooklyn)
boroughs = boroughs.union(df_staten)
In [215]:
boroughs = boroughs.groupBy("zone").agg({"sum(total_amount)":"sum", "sum(tip_amount)":"sum"})
In [216]:
boroughs = boroughs.toPandas()
In [217]:
boroughs = boroughs.rename(columns={"sum(sum(tip_amount))": "tips", "sum(sum(total_amount))": "profits"})
In [286]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=super_union["zone"], y=super_union["tips"],
                    mode='lines+markers',
                    name='Tips',
                        line_shape='spline'))
fig.add_trace(go.Scatter(x=super_union["zone"], y=super_union["profits"],
                    mode='lines+markers',
                    name='Profits',
                        line_shape='spline'))

fig.update_layout(
    title="Tips and profits for each NY Borough",
    xaxis_title="New York City Boroughs",
    yaxis_title="Dollars")


fig.show()

Investigate one month of trips data in 2015 and 2018

Consider one month of trips data from yellow taxis for each year

  1. Filter and cache/persist the result
In [227]:
df_15_06_spark = spark.read.parquet("parquets/yellow_tripdata_2015.parquet/Month=6")
df_18_06_spark = spark.read.parquet("parquets/yellow_tripdata_2018.parquet/Month=6")

Assessing seasonalities and looking at time series

Compute and plot the following time series indexed by day of the week and hour of day:

  1. The number of pickups
In [228]:
df_15_06_spark = df_15_06_spark.withColumn("week_day", fn.date_format("tpep_pickup_datetime", "EEEE")).withColumn("Hour", fn.hour("tpep_pickup_datetime"))
df_18_06_spark = df_18_06_spark.withColumn("week_day", fn.date_format("tpep_pickup_datetime", "EEEE")).withColumn("Hour", fn.hour("tpep_pickup_datetime"))
In [229]:
number_pickups_weekday_hour_15 = df_15_06_spark.groupby("week_day", "Hour").count()
number_pickups_weekday_hour_18 = df_18_06_spark.groupby("week_day", "Hour").count()
In [230]:
pickups_df_15 = number_pickups_weekday_hour_15.toPandas()
pickups_df_18 = number_pickups_weekday_hour_18.toPandas()
In [231]:
# This is so the days of the week are shown in the correct order in the plot
pickups_df_15["week_day"] = pd.Categorical(pickups_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
pickups_df_15 = pickups_df_15.sort_values(["week_day","Hour"])
In [232]:
# This is so the days of the week are shown in the correct order in the plot
pickups_df_18["week_day"] = pd.Categorical(pickups_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
pickups_df_18 = pickups_df_18.sort_values(["week_day","Hour"])
In [238]:
fig = px.line(pickups_df_15, x='Hour', y='count', color = "week_day", line_shape='spline',
              labels={'count':'# of pickups', 'week_day':'Day '})

fig.update_layout(
    title="Number of pickups per hour for every day of the week in June 2015",
    xaxis_title="Hour of the day",
    yaxis_title="Number of pickups",
    legend_title_text='Day of the week'
)

fig.show()
In [239]:
fig = px.line(pickups_df_18, x='Hour', y='count', color = "week_day", line_shape='spline',
              labels={'count':'# of pickups', 'week_day':'Day '})

fig.update_layout(
    title="Number of pickups per hour for every day of the week in June 2018",
    xaxis_title="Hour of the day",
    yaxis_title="Number of pickups",
    legend_title_text='Day of the week'
)

fig.show()
  1. The average fare
In [241]:
avg_fare_spark_15 = df_15_06_spark.groupby("week_day", "Hour").agg({"total_amount": "avg"})
avg_fare_spark_18 = df_18_06_spark.groupby("week_day", "Hour").agg({"total_amount": "avg"})
In [242]:
avg_fare_df_15 = avg_fare_spark_15.toPandas()
avg_fare_df_18 = avg_fare_spark_18.toPandas()
In [243]:
# This is so the days of the week are shown in the correct order in the plot
avg_fare_df_15["week_day"] = pd.Categorical(avg_fare_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
avg_fare_df_15 = avg_fare_df_15.sort_values(["week_day","Hour"])
In [244]:
# This is so the days of the week are shown in the correct order in the plot
avg_fare_df_18["week_day"] = pd.Categorical(avg_fare_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
avg_fare_df_18 = avg_fare_df_18.sort_values(["week_day","Hour"])
In [245]:
fig = px.line(avg_fare_df_15, x='Hour', y='avg(total_amount)', color = "week_day", line_shape='spline',
             labels={'avg(total_amount)':'avg fare', 'week_day':'Day '})

fig.update_layout(
    title="Average fare per hour for every day of the week in June 2015",
    xaxis_title="Hour of the day",
    yaxis_title="Average fare in dollars",
    legend_title_text='Day of the week'
)

fig.show()
In [246]:
fig = px.line(avg_fare_df_18, x='Hour', y='avg(total_amount)', color = "week_day",line_shape='spline',
             labels={'avg(total_amount)':'avg fare', 'week_day':'Day '})

fig.update_layout(
    title="Average fare per hour for every day of the week in June 2018",
    xaxis_title="Hour of the day",
    yaxis_title="Average fare in dollars",
    legend_title_text='Day of the week'
)

fig.show()
  1. The average trip duration
In [247]:
duration_spark_15 = df_15_06_spark.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
duration_spark_18 = df_18_06_spark.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
In [248]:
avg_trip_spark_15 = duration_spark_15.groupby("week_day", "Hour").agg({"duration": "avg"})
avg_trip_spark_18 = duration_spark_18.groupby("week_day", "Hour").agg({"duration": "avg"})
In [249]:
avg_trip_df_15 = avg_trip_spark_15.toPandas()
avg_trip_df_18 = avg_trip_spark_18.toPandas()
In [250]:
# This is so the days of the week are shown in the correct order in the plot
avg_trip_df_15["week_day"] = pd.Categorical(avg_trip_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
avg_trip_df_15 = avg_trip_df_15.sort_values(["week_day","Hour"])
In [251]:
# This is so the days of the week are shown in the correct order in the plot
avg_trip_df_18["week_day"] = pd.Categorical(avg_trip_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
avg_trip_df_18 = avg_trip_df_18.sort_values(["week_day","Hour"])
In [253]:
fig = px.line(avg_trip_df_15, x='Hour', y='avg(duration)', color = "week_day", line_shape='spline',
             labels={'avg(duration)':'avg trip duration ', 'week_day':'Day '})

fig.update_layout(
    title="Average trip duration per hour for every day of the week in June 2015",
    xaxis_title="Hour of the day",
    yaxis_title="Average trip duration in minutes",
    legend_title_text='Day of the week'
)

fig.show()
In [254]:
fig = px.line(avg_trip_df_18, x='Hour', y='avg(duration)', color = "week_day", line_shape='spline',
             labels={'avg(duration)':'avg trip duration ', 'week_day':'Day '})

fig.update_layout(
    title="Average trip duration per hour for every day of the week in June 2018",
    xaxis_title="Hour of the day",
    yaxis_title="Average trip duration in minutes",
    legend_title_text='Day of the week'
)

fig.show()
  1. Plot the average number of ongoing trips
In [268]:
avg_ongoing_trips_15 = df_15_06_spark.groupBy("Day", "Hour").count()
avg_ongoing_trips_18 = df_18_06_spark.groupBy("Day", "Hour").count()
In [272]:
avg_ongoing_trips_15 = avg_ongoing_trips_15.groupBy("Hour").agg({"count":"avg"})
avg_ongoing_trips_18 = avg_ongoing_trips_18.groupBy("Hour").agg({"count":"avg"})
In [273]:
avg_ongoing_trips_15 = avg_ongoing_trips_15.toPandas()
avg_ongoing_trips_18 = avg_ongoing_trips_18.toPandas()
In [278]:
avg_ongoing_trips_15 = avg_ongoing_trips_15.sort_values(by='Hour', ascending=True)
avg_ongoing_trips_18 = avg_ongoing_trips_18.sort_values(by='Hour', ascending=True)
In [283]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=avg_ongoing_trips_15["Hour"], y=avg_ongoing_trips_15["avg(count)"],
                    mode='lines+markers',
                    name='2015',
                        line_shape='spline'))
fig.add_trace(go.Scatter(x=avg_ongoing_trips_18["Hour"], y=avg_ongoing_trips_18["avg(count)"],
                    mode='lines+markers',
                    name='2018',
                        line_shape='spline'))

fig.update_layout(
    title="Average number of ongoing trips per hour in June",
    legend_title_text='Year',
    xaxis_title="Hour of the day",
    yaxis_title="Number of trips")


fig.show()

Rides to the airports

In order to find the longitude and lattitude of JFK and Newark airport as well as the longitude and magnitudes of Manhattan, you can use a service like geojson.io. Plot the following time series, indexed the day of the week and hour of the day

In [287]:
#Exact JFK coordinates
JFK_long = -73.78606796264648
JFK_lat = 40.64274482191706

#JFK coordinates
jfk_long_min = -73.83
jfk_long_max = -73.74

jfk_lat_min = 40.62
jfk_lat_max = 40.67

jfk_id = 132
# Midtown coordinates
midtown_long_min = -74.027
midtown_long_max = -73.95

midtown_lat_min = 40.725
midtown_lat_max = 40.77

midtown_ids = (224, 164, 107, 90, 246, 68, 48, 163, 162, 229, 50, 230)

# Function defined to calculate the median of a list of values
def median(values_list):
    med = np.median(values_list)
    return float(med)

udf_median = fn.udf(median, FloatType())
  1. Median duration of taxi trip leaving Midtown (Southern Manhattan) headed for JFK Airport
In [288]:
queryMid_JFK_15 = f"(pickup_longitude BETWEEN {midtown_long_min} AND {midtown_long_max} AND pickup_latitude BETWEEN {midtown_lat_min} AND {midtown_lat_max}) AND (dropoff_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND dropoff_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max})"
midtown_jfk_df_15 = df_15_06_spark.where(queryMid_JFK_15)
In [289]:
queryMid_JFK_18 = f"PULocationID IN {midtown_ids} AND DOLocationID = {jfk_id}"
midtown_jfk_df_18 = df_18_06_spark.where(queryMid_JFK_18)
In [290]:
midtown_jfk_df_15 = midtown_jfk_df_15.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
midtown_jfk_df_18 = midtown_jfk_df_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
In [291]:
median_trip_MID_JFK_spark_15 = midtown_jfk_df_15.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
median_trip_MID_JFK_spark_18 = midtown_jfk_df_18.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
In [292]:
med_trip_MID_JFK_df_15 = median_trip_MID_JFK_spark_15.toPandas()
med_trip_MID_JFK_df_18 = median_trip_MID_JFK_spark_18.toPandas()
In [293]:
# This is so the days of the week are shown in the correct order in the plot
med_trip_MID_JFK_df_15["week_day"] = pd.Categorical(med_trip_MID_JFK_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
med_trip_MID_JFK_df_15 = med_trip_MID_JFK_df_15.sort_values(["week_day","Hour"])
In [294]:
# This is so the days of the week are shown in the correct order in the plot
med_trip_MID_JFK_df_18["week_day"] = pd.Categorical(med_trip_MID_JFK_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
med_trip_MID_JFK_df_18 = med_trip_MID_JFK_df_18.sort_values(["week_day","Hour"])
In [297]:
fig = px.line(med_trip_MID_JFK_df_15, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
             labels={'median_duration':'median trip duration', 'week_day':'Day '})

fig.update_layout(
    title="Median trip duration from Midtown to JFK airport per day per hour | June 2015",
    xaxis_title="Hour of the day",
    yaxis_title="Median duration in minutes",
    legend_title_text='Day of the week'
)

fig.show()
In [298]:
fig = px.line(med_trip_MID_JFK_df_18, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
             labels={'median_duration':'median trip duration', 'week_day':'Day '})

fig.update_layout(
    title="Median trip duration from Midtown to JFK airport per day per hour | June 2018",
    xaxis_title="Hour of the day",
    yaxis_title="Median duration in minutes",
    legend_title_text='Day of the week'
)

fig.show()
  1. Median taxi duration of trip leaving from JFK Airport to Midtown (Southern Manhattan)
In [299]:
queryJFK_MID_15 = f"(pickup_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND pickup_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max}) AND (dropoff_longitude BETWEEN {midtown_long_min} AND {midtown_long_max} AND dropoff_latitude BETWEEN {midtown_lat_min} AND {midtown_lat_max})"
jfk_midtown_df_15= df_15_06_spark.where(queryJFK_MID_15)
In [300]:
queryJFK_MID_18 = f"PULocationID = {jfk_id} AND DOLocationID IN {midtown_ids}"
jfk_midtown_df_18 = df_18_06_spark.where(queryJFK_MID_18)
In [301]:
jfk_midtown_df_15 = jfk_midtown_df_15.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
jfk_midtown_df_18 = jfk_midtown_df_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
In [302]:
median_trip_JFK_MID_spark_15 = jfk_midtown_df_15.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
median_trip_JFK_MID_spark_18 = jfk_midtown_df_18.groupby("week_day", "Hour").agg(udf_median(fn.collect_list(col("duration"))).alias('median_duration'))
In [303]:
median_trip_JFK_MID_df_15 = median_trip_JFK_MID_spark_15.toPandas()
median_trip_JFK_MID_df_18 = median_trip_JFK_MID_spark_18.toPandas()
In [304]:
median_trip_JFK_MID_df_15["week_day"] = pd.Categorical(median_trip_JFK_MID_df_15.week_day, categories=calendar.day_name[0:], ordered=True)
median_trip_JFK_MID_df_15 = median_trip_JFK_MID_df_15.sort_values(["week_day","Hour"])
In [305]:
median_trip_JFK_MID_df_18["week_day"] = pd.Categorical(median_trip_JFK_MID_df_18.week_day, categories=calendar.day_name[0:], ordered=True)
median_trip_JFK_MID_df_18 = median_trip_JFK_MID_df_18.sort_values(["week_day","Hour"])
In [308]:
fig = px.line(median_trip_JFK_MID_df_15, x='Hour', y='median_duration', color = "week_day",line_shape='spline',
             labels={'median_duration':'median trip duration', 'week_day':'Day '})

fig.update_layout(
    title="Median trip duration from JFK airport to Midtown per day per hour | June 2015",
    xaxis_title="Hour of the day",
    yaxis_title="Median duration in minutes",
    legend_title_text='Day of the week'
)

fig.show()
In [309]:
fig = px.line(median_trip_JFK_MID_df_18, x='Hour', y='median_duration', color = "week_day", line_shape='spline',
             labels={'median_duration':'median trip duration', 'week_day':'Day '})

fig.update_layout(
    title="Median trip duration from JFK airport to Midtown per day per hour | June 2018",
    xaxis_title="Hour of the day",
    yaxis_title="Median duration in minutes",
    legend_title_text='Day of the week'
)

fig.show()

Geographic information

For this, you will need to find tools to display maps and to build choropleth maps. We let you look and find relevant tools to do this.

  1. Build a heatmap where color is a function of

A. number of pickups

In [310]:
total_pickups_15_06 = df_15_06_spark.groupBy("pickup_latitude", "pickup_longitude").count()
In [311]:
total_pickups_15_06_df = total_pickups_15_06.toPandas()
In [312]:
#total_pickups_2015_06_df
In [313]:
locationsA = total_pickups_15_06_df.values.tolist()
In [314]:
mapA = Map(center=(40.67, -73.94), zoom=8)

heatmap = Heatmap(locations=locationsA,radius=8, blur =15)

mapA.add_layer(heatmap);

mapA

B. number of dropoffs

In [315]:
total_dropoffs_15_06 = df_15_06_spark.groupBy("dropoff_latitude", "dropoff_longitude").count()
In [316]:
total_dropoffs_15_06.count()
Out[316]:
66366
In [317]:
total_dropoffs_15_06_df = total_dropoffs_15_06.toPandas()
In [318]:
#total_dropoffs_2015_06_df
In [319]:
locationsB = total_dropoffs_15_06_df.values.tolist()
In [320]:
mapB = Map(center=(40.67, -73.94), zoom=8)

heatmap = Heatmap(locations=locationsB,radius=8, blur =15)

mapB.add_layer(heatmap);

mapB

C. number of pickups with dropoff at some airport (JFK, LaGuardia, Newark)

In [321]:
#Newark coordinates
newark_long_min =-74.202
newark_long_max = -74.145
newark_lat_min = 40.66
newark_lat_max = 40.711

#LaGuardia coordinates
laguardia_long_min = -73.89
laguardia_long_max = -73.85
laguardia_lat_min = 40.764
laguardia_lat_max = 40.785
In [322]:
airport_dropoff_query = f"""
(dropoff_longitude BETWEEN {jfk_long_min} AND {jfk_long_max} AND dropoff_latitude BETWEEN {jfk_lat_min} AND {jfk_lat_max})
OR
(dropoff_longitude BETWEEN {newark_long_min} AND {newark_long_max} AND dropoff_latitude BETWEEN {newark_lat_min} AND {newark_lat_max})
OR
(dropoff_longitude BETWEEN {laguardia_long_min} AND {laguardia_long_max} AND dropoff_latitude BETWEEN {laguardia_lat_min} AND {laguardia_lat_max})
"""

airport_dropoffs_df = df_15_06_spark.where(airport_dropoff_query)
In [323]:
#airport_dropoffs_df.count()
In [324]:
pickups_to_airport = airport_dropoffs_df.groupBy("pickup_latitude", "pickup_longitude").count()
In [325]:
#pickups_to_airport.count()
In [326]:
pickups_to_airport_df = pickups_to_airport.toPandas()
In [327]:
#pickups_to_airport_df
In [328]:
locationsC = pickups_to_airport_df.values.tolist()
In [329]:
mapC = Map(center=(40.67, -73.94), zoom=8)

heatmap = Heatmap(locations=locationsC,radius=8, blur =15)

mapC.add_layer(heatmap);

mapC
  1. Build a choropleth map where color is a function of
In [330]:
pdf_18 = spark.read.parquet("parquets/yellow_tripdata_2018.parquet")
In [331]:
zones = geopandas.read_file("NYCTaxiZones.geojson")
zones.head()
Out[331]:
shape_area objectid shape_leng location_id zone borough geometry
0 0.0007823067885 1 0.116357453189 1 Newark Airport EWR MULTIPOLYGON (((-74.18445 40.69500, -74.18449 ...
1 0.00486634037837 2 0.43346966679 2 Jamaica Bay Queens MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ...
2 0.000314414156821 3 0.0843411059012 3 Allerton/Pelham Gardens Bronx MULTIPOLYGON (((-73.84793 40.87134, -73.84725 ...
3 0.000111871946192 4 0.0435665270921 4 Alphabet City Manhattan MULTIPOLYGON (((-73.97177 40.72582, -73.97179 ...
4 0.000497957489363 5 0.0921464898574 5 Arden Heights Staten Island MULTIPOLYGON (((-74.17422 40.56257, -74.17349 ...
In [332]:
zones_info = zones[["objectid", "zone"]]
zones_info["objectid"] = zones_info["objectid"].astype(int)

A. number of pickups in the area

In [333]:
pickups_18 = pdf_18.groupBy("PULocationID").count()
In [334]:
pickups_18_df = pickups_18.toPandas()
In [335]:
pickups_18_df = pd.merge(pickups_18_df, zones_info, how='left', left_on='PULocationID', right_on='objectid')
pickups_18_df = pickups_18_df[["PULocationID", "count", "zone"]]
In [336]:
pickups_18_df.head()
Out[336]:
PULocationID count zone
0 148 226127 Lower East Side
1 243 3515 Washington Heights North
2 31 103 Bronx Park
3 137 212890 Kips Bay
4 85 572 Erasmus
In [337]:
fig = px.choropleth(pickups_18_df, geojson=zones, 
                    locations='PULocationID', color='count',
                    hover_name = 'zone',
                    labels={'PULocationID':'Taxi zone', 'count': 'Pickups'}
                        
                   )
fig.update_geos(fitbounds="locations", visible=True)

fig.update_layout(title_text = 'Number of pickups per NYC Taxi area')

fig.show() 

B. ratio of number of payments by card/number of cash payments for pickups in the area

In [338]:
payment_ratio_18 = pdf_18.groupBy("PULocationID","payment_type").count()
In [339]:
t1 = payment_ratio_18.where("payment_type = 1").select("PULocationID", fn.col("count").alias("t1"))
In [340]:
t2 = payment_ratio_18.where("payment_type = 2").select("PULocationID", fn.col("count").alias("t2"))
In [341]:
payment_types = t1.join(t2, "PULocationID")
payment_types = payment_types.withColumn("ratio", fn.round(fn.col("t2")/fn.col("t1"), 3))
In [342]:
#payment_types.show()
In [343]:
payment_ratio = payment_types.toPandas()
In [344]:
payment_ratio = pd.merge(payment_ratio, zones_info, how='left', left_on='PULocationID', right_on='objectid')
payment_ratio = payment_ratio[["PULocationID", "ratio", "zone"]]
In [345]:
payment_ratio.head()
Out[345]:
PULocationID ratio zone
0 148 0.365 Lower East Side
1 243 0.778 Washington Heights North
2 31 0.852 Bronx Park
3 85 0.545 Erasmus
4 137 0.376 Kips Bay
In [346]:
fig = px.choropleth(payment_ratio, geojson=zones, 
                    locations='PULocationID', color='ratio',
                    hover_name = 'zone',
                    labels={'PULocationID':'Taxi zone', 'ratio': 'Card/Cash ratio'}    
                   )

fig.update_geos(fitbounds="locations", visible=True)

fig.update_layout(title_text = 'Card/Cash payment ratio per NYC Taxi area')

fig.show() 

C. ratio of total fare/trip duration for dropoff in the area

In [347]:
pdf_18 = pdf_18.withColumn("duration", (fn.col("tpep_dropoff_datetime").cast("long") - fn.col("tpep_pickup_datetime").cast("long"))/60)
In [348]:
doffs_total_fare_18 = pdf_18.groupBy("DOLocationID",).agg({"total_amount":"sum"})
doffs_total_duration_18 = pdf_18.groupBy("DOLocationID",).agg({"duration":"sum"})
In [349]:
ratio_fare_duration = doffs_total_fare_18.join(doffs_total_duration_18, "DOLocationID")
In [350]:
ratio_fare_duration = ratio_fare_duration.withColumn("ratio", fn.col("sum(total_amount)")/fn.col("sum(duration)"))
In [351]:
ratio_fare_duration = ratio_fare_duration.toPandas()
In [352]:
ratio_fare_duration = pd.merge(ratio_fare_duration, zones_info, how='left', left_on='DOLocationID', right_on='objectid')
ratio_fare_duration = ratio_fare_duration[["DOLocationID", "ratio", "zone"]]
In [353]:
ratio_fare_duration.head()
Out[353]:
DOLocationID ratio zone
0 148 0.821906 Lower East Side
1 243 1.127733 Washington Heights North
2 31 1.129673 Bronx Park
3 85 0.944509 Erasmus
4 137 0.929687 Kips Bay
In [354]:
fig = px.choropleth(ratio_fare_duration, geojson=zones, 
                    locations='DOLocationID', color='ratio',
                    hover_name = 'zone',
                    labels={'DOLocationID':'Taxi zone', 'ratio': 'Fare/Duration ratio'}     
                   )

fig.update_geos(fitbounds="locations", visible=True)

fig.update_layout(title_text = 'Total fare-Trip duration ratio per dropoff NYC Taxi area')

fig.show()
  1. Build an interactive choropleth with a slider allowing the user to select an hour of day and where the color is a function of question A and question B
In [253]:
pdf_18 = pdf_18.withColumn("Hour", fn.hour("tpep_dropoff_datetime"))
In [254]:
with open('NYCTaxiZones.geojson') as json_file:
    jsdata = json.load(json_file)
In [ ]:
data = dict(
            type='choropleth', # type of map-plot
            colorscale = 'Viridis',
            autocolorscale = False,
            locations = testing['DOLocationID'], # the column with the state
            z = testing['ratio'], # the variable I want to color-code
            geojson = jsdata,
            featureidkey = "properties.objectid",
            colorbar = dict(title = "Ratio")) 
In [ ]:
layout = dict(title = "Ratio values", geo=dict(fitbounds="locations", visible=True))
fig = dict( data=data, layout=layout )
plotly.offline.iplot(fig)
A. average number of dropoffs in the area during that hour of the day
In [255]:
avg_doffs_hour_18 = pdf_18.groupBy("DOLocationID","Hour").count()
In [257]:
#avg_doffs_hour_18.show()
In [265]:
slider3A_df = avg_doffs_hour_18.withColumn("average", fn.col("count")/61)
In [266]:
slider3A_df = slider3A_df.select("DOLocationID", "Hour", "average").toPandas()
In [267]:
slider3A_df.head()
Out[267]:
DOLocationID Hour average
0 196 0 8.688525
1 77 23 1.475410
2 92 2 3.032787
3 162 3 44.131148
4 114 15 127.672131
In [583]:
# Create empty list for data objects:    
data_sliderA = []

for hour in slider3A_df.Hour.unique():

    # Filter the df by hour
    df_hourly = slider3A_df[(slider3A_df['Hour'] == hour )]

    # Create the dictionary with the data for the current hour
    hourly_data = dict(
            type='choropleth', # type of map-plot
            colorscale = 'Viridis',
            locations = df_hourly['DOLocationID'], # the column with the state
            z = df_hourly['average'], # the variable I want to color-code
            geojson = jsdata,
            featureidkey = "properties.objectid") 
    
    data_sliderA.append(hourly_data)  # I add the dictionary to the list of dictionaries for the slider
In [584]:
# Steps for the slider
stepsA = []

for i in range(len(data_slider)):
    step = dict(method='restyle',
                args=['visible', [False] * len(data_slider)],
                label='Hour {}'.format(i)) # label to be displayed for each step (year)
    step['args'][1][i] = True
    stepsA.append(step)

# I create the 'sliders' object from the 'steps' 

slidersA = [dict(active=0, pad={"t": 1}, steps=steps)] 
In [585]:
# I set up the layout (including slider option)
layoutA = dict(geo=dict(fitbounds="locations", visible=True), sliders=slidersA)

# I create the figure object:

figA = dict(data=data_sliderA, layout=layoutA) 

# to plot in the notebook
plotly.offline.plot(figA)

#This will output an HTML file that can be opened in the browser, because opening it in the notebook is too heavy
#plotly.offline.plot(fig, auto_open=True, image = 'png', image_filename="avg_number_dropoffs" ,image_width=2000, image_height=1000, 
 #             filename='sliderA.html', validate=True)
Out[585]:
'temp-plot.html'
B. average ratio of tip over total fare amount for pickups in the area at given hour of the day
In [280]:
ratio_tip_fare = pdf_18.groupBy("DOLocationID","Hour").agg({"total_amount":"sum", "tip_amount":"sum"})
In [281]:
slider3B_df = ratio_tip_fare.withColumn("ratio", fn.col("sum(tip_amount)")/fn.col("sum(total_amount)"))
In [282]:
slider3B_df = slider3B_df.select("DOLocationID", "Hour", "ratio").toPandas()
In [283]:
slider3B_df.head()
Out[283]:
DOLocationID Hour ratio
0 196 0 0.084007
1 77 23 0.055108
2 92 2 0.063089
3 162 3 0.106187
4 114 15 0.115445
In [291]:
# Create empty list for data object:    
data_sliderB = []
In [292]:
for hour in slider3B_df.Hour.unique():

    # I select the hour
    df_hourly = slider3B_df[(slider3B_df['Hour'] == hour )]

    # Create the dictionary with the data for the current hour
    hourly_data = dict(
            type='choropleth', # type of map-plot
            colorscale = 'Viridis',
            autocolorscale = False,
            locations = df_hourly['DOLocationID'], # the column with the state
            z = df_hourly['ratio'], # the variable I want to color-code
            geojson = jsdata,
            featureidkey = "properties.objectid") 
    
    data_sliderB.append(hourly_data)  # I add the dictionary to the list of dictionaries for the slider
In [293]:
##  I create the steps for the slider
stepsB = []

for i in range(len(data_sliderB)):
    step = dict(method='restyle',
                args=['visible', [False] * len(data_slider)],
                label='Hour {}'.format(i)) # label to be displayed for each step (year)
    step['args'][1][i] = True
    stepsB.append(step)

##  I create the 'sliders' object from the 'steps' 

slidersB = [dict(active=0, pad={"t": 1}, steps=stepsB)] 
In [294]:
layoutB = dict(geo=dict(fitbounds="locations", visible=True), sliders=slidersB)

# I create the figure object:
figB = dict(data=data_slider, layout=layoutB) 

# Plot in the notebook
#plotly.offline.iplot(fig)

#This will output an HTML file that can be opened in the browser, because opening it in the notebook is too heavy
plotly.offline.plot(fig, auto_open=True, image = 'png', image_filename="ratio_tip_fare" ,image_width=2000, image_height=1000, 
              filename='sliderB.html', validate=True)
Out[294]:
'sliderB.html'
In [ ]:
spark.stop()